In [1]:
import pandas as pd
data = pd.read_parquet('conector_latency_and_true_price.parquet')
2- Data overview (5 minutes downsample)¶
In [2]:
import datetime
import plotly.express as px
downsampled = (
data.set_index("timestamp", drop=False)
.groupby(["connector", "symbol"])
.resample("5T")
.last()
)
for symbol in data.symbol.unique():
symbol_data = downsampled[downsampled.symbol == symbol]
display(
px.line(
symbol_data,
x="timestamp",
y="price",
color="connector",
title=symbol
)
)
3- Latency¶
In [3]:
import plotly.express as px
# Per-connector latency
data["connector_latency_ms"] = (
data["fetch_timestamp"] - data["timestamp"]
).dt.total_seconds() * 1000
# display(
# px.box(
# data,
# x="connector",
# y="connector_latency_ms",
# log_y=True,
# title="Fetch latency by connector, log scale",
# labels={
# "connector": "Exchange",
# "connector_latency_ms": "Fetch latency (ms)",
# },
# points=False,
# width=800,
# height=400
# )
# )
# NOTE: this plot is not included because it's very large (in MB) as it includes all data points (px.box is pretty innefficient)
In [4]:
# DB insertion median over time
data["db_insertion_latency"] = (
data["update_timestamp"] - data["fetch_timestamp"]
).dt.total_seconds() * 1000
db_insertion_latency_resample = data.set_index("timestamp")[
"db_insertion_latency"
].resample("10T")
db_insertion_latency = pd.DataFrame(
{
"max": db_insertion_latency_resample.max(),
"percentile_95": db_insertion_latency_resample.quantile(0.95),
"median": db_insertion_latency_resample.median(),
}
)
fig = px.scatter(
db_insertion_latency,
x=db_insertion_latency.index,
y=db_insertion_latency.columns,
log_y=True
)
fig.update_layout(
title="Database insertion latencies over time",
yaxis_title="DB insertion latency (ms)",
xaxis_title="Timestamp (10min buckets)",
hovermode="x unified",
)
fig.update_traces(marker_line_width=0, selector=dict(type="bar"))
display(fig)
4- Analysis of CRV/USDT w/ aggregate algorithm¶
The algorithm uses the spread and frequency of ticker changes by connector to estimate the weight each connector should have. It then uses a weighted average of the last prices to output the true price.
In [5]:
SYMBOL = 'CRV/USDT'
from_index = datetime.datetime(2023, 8, 2, 15, 0)
to_index = datetime.datetime(2023, 8, 2, 16, 0)
data_symbol = data.loc[(data.symbol == SYMBOL) & (data.timestamp > from_index) & (data.timestamp < to_index)]
In [6]:
import numpy as np
weight_by_connector = {}
for connector in data_symbol.connector.unique():
connector_prices = data_symbol[data_symbol.connector == connector].copy()
connector_prices["price_diff"] = connector_prices["price"].diff().abs()
connector_prices = connector_prices.loc[connector_prices.price_diff != 0]
diff = connector_prices["price_diff"]
weight_by_connector[connector] = 1.0 / diff.apply(float).var()
# We use the server-provided timestamp for the real prices, but the update_timestamp
# (websocket latency + database insertion latency) to compute the true price.
# The true price therefore has a realistic latency that depends on the connector.
real_prices_by_connector = pd.pivot_table(
data=data_symbol,
index="timestamp",
columns="connector",
values="price"
).ffill()
latency_prices_by_connector = pd.pivot_table(
data=data_symbol,
index="update_timestamp",
columns="connector",
values="price"
).ffill()
def row_avg(row):
row = row.dropna()
weights = pd.Series(
[weight_by_connector[connector] for connector in row.index], index=row.index
)
valid_weights = weights[~weights.isnull()]
# Remove nan weights
row = row[~weights.isnull()]
return np.average(row.to_numpy(), weights=valid_weights)
latency_true_price = latency_prices_by_connector.apply(row_avg, axis=1)
In [7]:
latency_true_price_fig = px.line(
latency_true_price,
x=latency_true_price.index,
y=latency_true_price,
title=SYMBOL
)
connectors_fig = px.line(
real_prices_by_connector,
x=real_prices_by_connector.index,
y=real_prices_by_connector.columns,
)
def hex_string_to_rgba(hex_string: str, alpha: str):
hex_string = hex_string.lstrip("#")
rgb = tuple(int(hex_string[i : i + 2], 16) for i in (0, 2, 4))
color_rgb_str = ", ".join([str(x) for x in rgb])
return f"rgba({color_rgb_str}, {alpha})"
for trace in connectors_fig.data:
color = trace.line.color
trace.line.color = hex_string_to_rgba(color, "0.2")
latency_true_price_fig.add_trace(trace)
display(latency_true_price_fig)